Add lambda support and array_transform udf#21679
Conversation
|
this is pretty amazing -- I put a note to include it in the 55 release's notes: |
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. Followup on #21679 (comment) ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Raz Luvaton <16746759+rluvaton@users.noreply.github.com>
|
Many thanks for everyone involved, both reviewers and also those who showed interest on the feature. Reviewing such a big PR is not easy and I'm very grateful for it, thanks again ❤️ |
|
This is amazing. Thank you so much for all the hard work on this. This is definitely one of the things I am most excited about in the next release. This is going to be huge! |
|
I just wanted to bring to attention that DuckDB deprecated this very sytnax because of conflicts with JSON operators. I guess we may want to support both long term (Spark uses the arrow syntax), but I think there's a real risk that we are not even evaluating incompatibility with JSON operators because they are not implemented by default in DataFusion (but we are in talks to do so #21301). Happy to open an issue for discussion but wanted to check first if this was discussed at all, as far as I can tell from going over the PR it has not. |
|
@adriangb I think this decision can be left to the user via the configurable dialect, as today. This PR merely consumes the LambdaFunction from sqlparser-rs AST, which syntax it parses is defined by the configured dialect. Is up to sqlparser to avoid conflicts (see apache/datafusion-sqlparser-rs#2224). The sqllogictests here requires setting the dialect to databricks, for example. I guess what we can do here is:
|
## Which issue does this PR close? No issue — this is a follow-up to apache#21679. ## Rationale for this change In `ScalarUDF`, arity is enforced by the framework via `TypeSignature`. In `HigherOrderUDF`, functions with a fixed number of value and lambda arguments had to use `UserDefined` and manually validate arity inside `coerce_value_types`, which is boilerplate that every implementor has to repeat. ## What changes are included in this PR? Adds `HigherOrderTypeSignature::Exact { values: usize, lambdas: usize } `variant that enforces a fixed count of value and lambda arguments, calling coerce_value_types only for type coercion as well ## Are these changes tested? Yes I added some planning tests for exact siganture in `datafusion/expr/src/type_coercion/functions.rs` ## Are there any user-facing changes? Yes a new signature for HigherOrderSignature was added.
## Which issue does this PR close? Follow up of apache#21679 ## Rationale for this change As noted by @LiaCastaneda in [apache#21679 (comment)](https://github.com/apache/datafusion/pull/21679/changes#r3152449038), the higher-order signature can be made less error prone by removing the need to set the `coerce_values_for_lambdas` field when `coerce_values_for_lambdas` should be called ## What changes are included in this PR? Remove `HigherOrderSignature.coerce_values_for_lambdas/with_coerce_values_for_lambdas` and modify `HigherOrderUDF::coerce_values_for_lambdas` return from `Result<Vec<DataType>>` to `Result<Option<Vec<DataType>>>`, and it's default implementation which now returns `Ok(None)` instead of an error ## Are these changes tested? Existing test cover the change ## Are there any user-facing changes? To unreleased items only
) ## Which issue does this PR close? Follow-up of apache#21679 that forgot to add these methods to `SessionContext` ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
## Which issue does this PR close? Follow up of apache#21679 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
…e#21895) ## Which issue does this PR close? Partially addresses apache#14509 — implements `array_filter` / `list_filter`. ## Rationale for this change `array_transform` (apache#21679) added the first `HigherOrderUDF`. `array_filter` is the natural companion: filter array elements with a boolean lambda, matching Spark `filter` / DuckDB `list_filter` semantics. ## What changes are included in this PR? - New `HigherOrderUDF` `ArrayFilter` (`array_filter` / `list_filter` alias) - Boolean lambda per element; `true` keeps, `false`/null drops (matches Spark semantics) - Handles `List`, `LargeList`, sliced arrays, null sublists - Scalar predicate short-circuit (`x -> true` / `x -> false`) - No-copy fast path when nothing is filtered (skips `arrow::compute::filter`) - Shared HOF helpers extracted from `array_transform` into a common module (`value_lambda_pair`, `coerce_single_list_arg`, `single_list_lambda_parameters`, `extract_list_values`) - Shared unit test helpers for higher-order function tests ## Are these changes tested? - Unit tests: basic filter, multiple sublists, sliced arrays, null sublists, all-filtered-out, nothing-filtered (fast path), scalar true/false predicates - SQL logic tests in `array_filter.slt`: filter variants, `array_filter` + `array_transform` combinations, error cases ## Are there any user-facing changes? Yes — `array_filter(array, lambda)` and alias `list_filter(array, lambda)` are now available as SQL functions.
This a clean version of apache#18921 to make it easier to review **this is a breaking change due to adding variant to `Expr` enum, new methods on traits `Session`, `FunctionRegistry` and `ContextProvider` and a new arg on `TaskContext::new`** This PR adds support for lambdas and the `array_transform` function used to test the lambda implementation. Example usage: ```sql SELECT array_transform([2, 3], v -> v != 2); [false, true] -- arbitrally nested lambdas are also supported SELECT array_transform([[[2, 3]]], m -> array_transform(m, l -> array_transform(l, v -> v*2))); [[[4, 6]]] ``` Note: column capture has been removed for now and will be added on a follow on PR, see apache#21172 Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code 3 new `Expr` variants are added, `HigherOrderFunction`, owing a new trait `HigherOrderUDF`, which is like a `ScalarFunction`/`ScalarUDFImpl` with support for lambdas, `Lambda`, for the lambda body and it's parameters names, and `LambdaVariable`, which is like `Column` but for lambdas parameters. Their logical representations: ```rust enum Expr { // array_transform([2, 3], v -> v != 2) HigherOrderFunction(HigherOrderFunction), // v -> v != 2 Lambda(Lambda), // v, of the lambda body: v != 2 LambdaVariable(LambdaVariable), ... } // array_transform([2, 3], v -> v != 2) struct HigherOrderFunction { // global instance of array_transform pub func: Arc<dyn HigherOrderUDF>, // [Expr::ScalarValue([2, 3]), Expr::Lambda(v -> v != 2)] pub args: Vec<Expr>, } // v -> v != 2 struct Lambda { // ["v"] pub params: Vec<String>, // v != 2 pub body: Box<Expr>, } // v, of the lambda body: v != 2 struct LambdaVariable { // "v" pub name: String, // Field::new("", DataType::Int32, false) // Note: a follow on PR will make this field optional // to free expr_api from specifying it beforehand, // and add resolve_lambda_variables method to Expr, // similar to Expr::Placeholder, see apache#21172 pub field: FieldRef, pub spans: Spans, } ``` The example would be planned into a tree like this: ``` HigherOrderFunctionExpression name: array_transform children: 1. ListExpression [2,3] 2. LambdaExpression parameters: ["v"] body: BinaryExpression (!=) left: LambdaVariableExpression("v", Field::new("", Int32, false)) right: LiteralExpression("2") ``` The physical counterparts definition: ```rust struct HigherOrderFunctionExpr { // global instance of array_transform fun: Arc<dyn HigherOrderUDF>, // "array_transform" name: String, // [LiteralExpr([2, 3], LambdaExpr("v -> v != 2"))] args: Vec<Arc<dyn PhysicalExpr>>, // [1], the positions at args that contains lambdas lambda_positions: Vec<usize>, // Field::new("", DataType::new_list(DataType::Boolean, false), false) return_field: FieldRef, config_options: Arc<ConfigOptions>, } struct LambdaExpr { // ["v"] params: Vec<String>, // v -> v != 2 body: Arc<dyn PhysicalExpr>, } struct LambdaVariable { // Field::new("v", DataType::Int32, false) field: FieldRef, // 0, the first and only parameter, "v" index: usize, } ``` Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of `HigherOrderUDF` and the `array_transform` implementation of `HigherOrderUDF` relevant methods, collapsed due to their size The added `HigherOrderUDF` trait is almost a clone of `ScalarUDFImpl`, with the exception of: 1. `return_field_from_args` and `invoke_with_args`, where now `args.args` is a list of enums with two variants: `Value` or `Lambda` instead of a list of values 2. the addition of `lambda_parameters`, which return a `Field` for each parameter supported for every lambda argument based on the `Field` of the non lambda arguments 3. the removal of `return_field` and the deprecated ones `is_nullable` and `display_name`. 4. Not yet includes analogues to the methods preimage, placement, evaluate_bounds, propagate_constraints, output_ordering and preserves_lex_ordering <details><summary>HigherOrderUDF</summary> ```rust trait HigherOrderUDF { /// Return the field of all the parameters supported by all the supported lambdas of this function /// based on the field of the value arguments. If a lambda support multiple parameters, or if multiple /// lambdas are supported and some are optional, all should be returned, /// regardless of whether they are used on a particular invocation /// /// Tip: If you have a [`HigherOrderFunction`] invocation, you can call the helper /// [`HigherOrderFunction::lambda_parameters`] instead of this method directly /// /// [`HigherOrderFunction`]: crate::expr::HigherOrderFunction /// [`HigherOrderFunction::lambda_parameters`]: crate::expr::HigherOrderFunction::lambda_parameters /// /// Example for array_transform: /// /// `array_transform([2.0, 8.0], v -> v > 4.0)` /// /// ```ignore /// let lambda_parameters = array_transform.lambda_parameters(&[ /// Arc::new(Field::new("", DataType::new_list(DataType::Float32, false))), // the Field of the literal `[2, 8]` /// ])?; /// /// assert_eq!( /// lambda_parameters, /// vec![ /// // the lambda supported parameters, regardless of how many are actually used /// vec![ /// // the value being transformed /// Field::new("", DataType::Float32, false), /// ] /// ] /// ) /// ``` /// /// The implementation can assume that some other part of the code has coerced /// the actual argument types to match [`Self::signature`]. fn lambda_parameters(&self, value_fields: &[FieldRef]) -> Result<Vec<Vec<Field>>>; fn return_field_from_args(&self, args: LambdaReturnFieldArgs) -> Result<FieldRef>; fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> Result<ColumnarValue>; // ... omitted methods that are similar in ScalarUDFImpl } /// An argument to a lambda function pub enum ValueOrLambda<V, L> { /// A value with associated data Value(V), /// A lambda with associated data Lambda(L), } /// Information about arguments passed to the function /// /// This structure contains metadata about how the function was called /// such as the type of the arguments, any scalar arguments and if the /// arguments can (ever) be null /// /// See [`HigherOrderUDF::return_field_from_args`] for more information pub struct LambdaReturnFieldArgs<'a> { /// The data types of the arguments to the function /// /// If argument `i` to the function is a lambda, it will be the field of the result of the /// lambda if evaluated with the parameters returned from [`HigherOrderUDF::lambda_parameters`] /// /// For example, with `array_transform([1], v -> v == 5)` /// this field will be `[ /// ValueOrLambda::Value(Field::new("", DataType::List(DataType::Int32), false)), /// ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false)) /// ]` pub arg_fields: &'a [ValueOrLambda<FieldRef, FieldRef>], /// Is argument `i` to the function a scalar (constant)? /// /// If the argument `i` is not a scalar, it will be None /// /// For example, if a function is called like `array_transform([1], v -> v == 5)` /// this field will be `[Some(ScalarValue::List(...), None]` pub scalar_arguments: &'a [Option<&'a ScalarValue>], } /// Arguments passed to [`HigherOrderUDF::invoke_with_args`] when invoking a /// lambda function. pub struct HigherOrderFunctionArgs { /// The evaluated arguments and lambdas to the function pub args: Vec<ValueOrLambda<ColumnarValue, LambdaArgument>>, /// Field associated with each arg, if it exists /// For lambdas, it will be the field of the result of /// the lambda if evaluated with the parameters /// returned from [`HigherOrderUDF::lambda_parameters`] pub arg_fields: Vec<ValueOrLambda<FieldRef, FieldRef>>, /// The number of rows in record batch being evaluated pub number_rows: usize, /// The return field of the lambda function returned /// (from `return_field_from_args`) when creating the /// physical expression from the logical expression pub return_field: FieldRef, /// The config options at execution time pub config_options: Arc<ConfigOptions>, } /// A lambda argument to a HigherOrderFunction pub struct LambdaArgument { /// The parameters defined in this lambda /// /// For example, for `array_transform([2], v -> -v)`, /// this will be `vec![Field::new("v", DataType::Int32, true)]` params: Vec<FieldRef>, /// The body of the lambda /// /// For example, for `array_transform([2], v -> -v)`, /// this will be the physical expression of `-v` body: Arc<dyn PhysicalExpr>, } impl LambdaArgument { /// Evaluate this lambda /// `args` should evalute to the value of each parameter /// of the correspondent lambda returned in [HigherOrderUDF::lambda_parameters]. pub fn evaluate( &self, args: &[&dyn Fn() -> Result<ArrayRef>], ) -> Result<ColumnarValue> { let columns = args .iter() .take(self.params.len()) .map(|arg| arg()) .collect::<Result<_>>()?; let schema = Arc::new(Schema::new(self.params.clone())); let batch = RecordBatch::try_new(schema, columns)?; self.body.evaluate(&batch) } } ``` </details> <details><summary>array_transform lambda_parameters implementation</summary> ```rust impl HigherOrderUDF for ArrayTransform { fn lambda_parameters(&self, value_fields: &[FieldRef]) -> Result<Vec<Vec<Field>>> { let list = if value_fields.len() == 1 { &value_fields[0] } else { return plan_err!( "{} function requires 1 value arguments, got {}", self.name(), value_fields.len() ); }; let field = match list.data_type() { DataType::List(field) => field, DataType::LargeList(field) => field, DataType::FixedSizeList(field, _) => field, _ => return plan_err!("expected list, got {list}"), }; // we don't need to check whether the lambda contains more than two parameters, // e.g. array_transform([], (v, i, j) -> v+i+j), as datafusion will do that for us let value = Field::new("", field.data_type().clone(), field.is_nullable()) .with_metadata(field.metadata().clone()); Ok(vec![vec![value]]) } } ``` </details> <details><summary>array_transform return_field_from_args implementation</summary> ```rust fn value_lambda_pair<'a, V: Debug, L: Debug>( name: &str, args: &'a [ValueOrLambda<V, L>], ) -> Result<(&'a V, &'a L)> { let [value, lambda] = take_function_args(name, args)?; let (ValueOrLambda::Value(value), ValueOrLambda::Lambda(lambda)) = (value, lambda) else { return plan_err!( "{name} expects a value followed by a lambda, got {value:?} and {lambda:?}" ); }; Ok((value, lambda)) } impl HigherOrderUDF for ArrayTransform { fn return_field_from_args( &self, args: HigherOrderReturnFieldArgs, ) -> Result<Arc<Field>> { let (list, lambda) = value_lambda_pair(self.name(), args.arg_fields)?; // lambda is the resulting field of executing the lambda body // with the parameters returned in lambda_parameters let field = Arc::new(Field::new( Field::LIST_FIELD_DEFAULT_NAME, lambda.data_type().clone(), lambda.is_nullable(), )); let return_type = match list.data_type() { DataType::List(_) => DataType::List(field), DataType::LargeList(_) => DataType::LargeList(field), DataType::FixedSizeList(_, size) => DataType::FixedSizeList(field, *size), other => plan_err!("expected list, got {other}")?, }; Ok(Arc::new(Field::new("", return_type, list.is_nullable()))) } } ``` </details> <details><summary>array_transform invoke_with_args implementation</summary> ```rust impl HigherOrderUDF for ArrayTransform { fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> Result<ColumnarValue> { let (list, lambda) = value_lambda_pair(self.name(), &args.args)?; let list_array = list.to_array(args.number_rows)?; // Fast path for fully null input array and also the only way to safely work with // a fully null fixed size list array as it can't be handled by remove_list_null_values below if list_array.null_count() == list_array.len() { return Ok(ColumnarValue::Array(new_null_array( args.return_type(), list_array.len(), ))); } // as per list_values docs, if list_array is sliced, list_values will be sliced too, // so before constructing the transformed array below, we must adjust the list offsets with // adjust_offsets_for_slice let list_values = list_values(&list_array)?; // by passing closures, lambda.evaluate can evaluate only those actually needed let values_param = || Ok(Arc::clone(&list_values)); // call the transforming lambda let transformed_values = lambda .evaluate(&[&values_param])? .into_array(list_values.len())?; let field = match args.return_field.data_type() { DataType::List(field) | DataType::LargeList(field) | DataType::FixedSizeList(field, _) => Arc::clone(field), _ => { return exec_err!( "{} expected ScalarFunctionArgs.return_field to be a list, got {}", self.name(), args.return_field ); } }; let transformed_list = match list_array.data_type() { DataType::List(_) => { let list = list_array.as_list(); // since we called list_values above which would return sliced values for // a sliced list, we must adjust the offsets here as otherwise they would be invalid let adjusted_offsets = adjust_offsets_for_slice(list); Arc::new(ListArray::new( field, adjusted_offsets, transformed_values, list.nulls().cloned(), )) as ArrayRef } DataType::LargeList(_) => { let large_list = list_array.as_list(); // since we called list_values above which would return sliced values for // a sliced list, we must adjust the offsets here as otherwise they would be invalid let adjusted_offsets = adjust_offsets_for_slice(large_list); Arc::new(LargeListArray::new( field, adjusted_offsets, transformed_values, large_list.nulls().cloned(), )) } DataType::FixedSizeList(_, value_length) => { Arc::new(FixedSizeListArray::new( field, *value_length, transformed_values, list_array.as_fixed_size_list().nulls().cloned(), )) } other => exec_err!("expected list, got {other}")?, }; Ok(ColumnarValue::Array(transformed_list)) } } ``` </details> <details><summary>How relevant HigherOrderUDF methods would be called and what they would return during planning and evaluation of the example</summary> ```rust // this is called at sql planning let lambda_parameters = lambda_udf.lambda_parameters(&[ Field::new("", DataType::new_list(DataType::Int32, false), false), // the Field of the [2, 3] literal ])?; assert_eq!( lambda_parameters, vec![ // the parameters that *can* be declared on the lambda, and not only // those actually declared: the implementation doesn't need to care // about it vec![ Field::new("", DataType::Int32, false), // the list inner value ]] ); // this is called every time ExprSchemable is called on a HigherOrderFunction let return_field = array_transform.return_field_from_args(&LambdaReturnFieldArgs { arg_fields: &[ ValueOrLambda::Value(Field::new("", DataType::new_list(DataType::Int32, false), false)), ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false)), // the return_field of the expression "v != 2" when "v" is of the type returned in lambda_parameters ], scalar_arguments // irrelevant })?; assert_eq!(return_field, Field::new("", DataType::new_list(DataType::Boolean, false), false)); let value = array_transform.evaluate(&HigherOrderFunctionArgs { args: vec![ ValueOrLambda::Value(List([2, 3])), ValueOrLambda::Lambda(LambdaArgument of `v -> v != 2`), ], arg_fields, // same as above number_rows: 1, return_field, // same as above config_options, // irrelevant })?; assert_eq!(value, BooleanArray::from([false, true])) ``` </details> <br> <br> A pair HigherOrderUDF/HigherOrderUDFImpl like ScalarFunction was not used because those exist only [to maintain backwards compatibility with the older API](https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html#api-note) </details> <br> Why `LambdaVariable` and not `Column`: Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t". Example of code of another traversal that would break: ```rust fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter { let mut used_columns = HashSet::new(); expr.apply(|expr| { if let Some(col) = expr.as_any().downcast_ref::<Column>() { // if this is a lambda column, this function will break used_columns.insert(col.index()); } Ok(TreeNodeRecursion::Continue) }); ... } ``` Furthermore, the implemention of `ExprSchemable` and `PhysicalExpr::return_field` for `Column` expects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters. By including a `FieldRef` on `LambdaVariable` that should be resolved during construction time in the sql planner, `ExprSchemable` and `PhysicalExpr::return_field` simply return it's own Field: <details><summary>LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation </summary> ```rust impl ExprSchemable for Expr { fn to_field( &self, schema: &dyn ExprSchema, ) -> Result<(Option<TableReference>, Arc<Field>)> { let (relation, schema_name) = self.qualified_name(); let field = match self { Expr::LambdaVariable(l) => Ok(Arc::clone(&l.field)), ... }?; Ok(( relation, Arc::new(field.as_ref().clone().with_name(schema_name)), )) } ... } impl PhysicalExpr for LambdaVariable { fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> { Ok(Arc::clone(&self.field)) } ... } ``` </details> <br> <details><summary>Possible alternatives discarded due to complexity, requiring downstream changes and implementation size:</summary> 1. Add a new set of TreeNode methods that provides the set of lambdas parameters names seen during the traversal, so column nodes can be tested if they refer to a regular column or to a lambda parameter. Any downstream user that wants to support lambdas would need use those methods instead of the existing ones. This also would add 1k+ lines to the PR. ```rust impl Expr { pub fn transform_with_lambdas_params< F: FnMut(Self, &HashSet<String>) -> Result<Transformed<Self>>, >( self, mut f: F, ) -> Result<Transformed<Self>> {} } ``` How minimize_join_filter would looks like: ```rust fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter { let mut used_columns = HashSet::new(); expr.apply_with_lambdas_params(|expr, lambdas_params| { if let Some(col) = expr.as_any().downcast_ref::<Column>() { // dont include lambdas parameters if !lambdas_params.contains(col.name()) { used_columns.insert(col.index()); } } Ok(TreeNodeRecursion::Continue) }) ... } ``` 2. Add a flag to the Column node indicating if it refers to a lambda parameter. Still requires checking for it on existing tree traversals that works on Columns (30+) and also downstream. ```rust //logical struct Column { pub relation: Option<TableReference>, pub name: String, pub spans: Spans, pub is_lambda_parameter: bool, } //physical struct Column { name: String, index: usize, is_lambda_parameter: bool, } ``` How minimize_join_filter would look like: ```rust fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter { let mut used_columns = HashSet::new(); expr.apply(|expr| { if let Some(col) = expr.as_any().downcast_ref::<Column>() { // dont include lambdas parameters if !col.is_lambda_parameter { used_columns.insert(col.index()); } } Ok(TreeNodeRecursion::Continue) }) ... } ``` 1. Add a new set of TreeNode methods that provides a schema that includes the lambdas parameters for the scope of the node being visited/transformed: ```rust impl Expr { pub fn transform_with_schema< F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>, >( self, schema: &DFSchema, f: F, ) -> Result<Transformed<Self>> { ... } ... other methods } ``` For any given HigherOrderFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from HigherOrderUDF::lambda_parameters How it would look like: ```rust pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> { let mut has_placeholder = false; // Provide the schema as the first argument. // Transforming closure receive an adjusted_schema as argument self.transform_with_schema(schema, |mut expr, adjusted_schema| { match &mut expr { // Default to assuming the arguments are the same type Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => { // use adjusted_schema and not schema. Those expressions may contain // columns referring to a lambda parameter, which Field would only be // available in adjusted_schema and not in schema rewrite_placeholder(left.as_mut(), right.as_ref(), adjusted_schema)?; rewrite_placeholder(right.as_mut(), left.as_ref(), adjusted_schema)?; } .... ``` 2. Make available trought LogicalPlan and ExecutionPlan nodes a schema that includes all lambdas parameters from all expressions owned by the node, and use this schema for tree traversals. For nodes which won't own any expression, the regular schema can be returned ```rust impl LogicalPlan { fn lambda_extended_schema(&self) -> &DFSchema; } trait ExecutionPlan { fn lambda_extended_schema(&self) -> &DFSchema; } //usage impl LogicalPlan { pub fn replace_params_with_values( self, param_values: &ParamValues, ) -> Result<LogicalPlan> { self.transform_up_with_subqueries(|plan| { // use plan.lambda_extended_schema() containing lambdas parameters // instead of plan.schema() which wont let lambda_extended_schema = Arc::clone(plan.lambda_extended_schema()); let name_preserver = NamePreserver::new(&plan); plan.map_expressions(|e| { // if this expression is child of lambda and contain columns referring it's parameters // the lambda_extended_schema already contain them let (e, has_placeholder) = e.infer_placeholder_types(&lambda_extended_schema)?; .... ``` </details> <br> --------- Co-authored-by: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> Co-authored-by: Lía Adriana <lia.castaneda@datadoghq.com>
This a clean version of apache#18921 to make it easier to review **this is a breaking change due to adding variant to `Expr` enum, new methods on traits `Session`, `FunctionRegistry` and `ContextProvider` and a new arg on `TaskContext::new`** This PR adds support for lambdas and the `array_transform` function used to test the lambda implementation. Example usage: ```sql SELECT array_transform([2, 3], v -> v != 2); [false, true] -- arbitrally nested lambdas are also supported SELECT array_transform([[[2, 3]]], m -> array_transform(m, l -> array_transform(l, v -> v*2))); [[[4, 6]]] ``` Note: column capture has been removed for now and will be added on a follow on PR, see apache#21172 Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code 3 new `Expr` variants are added, `HigherOrderFunction`, owing a new trait `HigherOrderUDF`, which is like a `ScalarFunction`/`ScalarUDFImpl` with support for lambdas, `Lambda`, for the lambda body and it's parameters names, and `LambdaVariable`, which is like `Column` but for lambdas parameters. Their logical representations: ```rust enum Expr { // array_transform([2, 3], v -> v != 2) HigherOrderFunction(HigherOrderFunction), // v -> v != 2 Lambda(Lambda), // v, of the lambda body: v != 2 LambdaVariable(LambdaVariable), ... } // array_transform([2, 3], v -> v != 2) struct HigherOrderFunction { // global instance of array_transform pub func: Arc<dyn HigherOrderUDF>, // [Expr::ScalarValue([2, 3]), Expr::Lambda(v -> v != 2)] pub args: Vec<Expr>, } // v -> v != 2 struct Lambda { // ["v"] pub params: Vec<String>, // v != 2 pub body: Box<Expr>, } // v, of the lambda body: v != 2 struct LambdaVariable { // "v" pub name: String, // Field::new("", DataType::Int32, false) // Note: a follow on PR will make this field optional // to free expr_api from specifying it beforehand, // and add resolve_lambda_variables method to Expr, // similar to Expr::Placeholder, see apache#21172 pub field: FieldRef, pub spans: Spans, } ``` The example would be planned into a tree like this: ``` HigherOrderFunctionExpression name: array_transform children: 1. ListExpression [2,3] 2. LambdaExpression parameters: ["v"] body: BinaryExpression (!=) left: LambdaVariableExpression("v", Field::new("", Int32, false)) right: LiteralExpression("2") ``` The physical counterparts definition: ```rust struct HigherOrderFunctionExpr { // global instance of array_transform fun: Arc<dyn HigherOrderUDF>, // "array_transform" name: String, // [LiteralExpr([2, 3], LambdaExpr("v -> v != 2"))] args: Vec<Arc<dyn PhysicalExpr>>, // [1], the positions at args that contains lambdas lambda_positions: Vec<usize>, // Field::new("", DataType::new_list(DataType::Boolean, false), false) return_field: FieldRef, config_options: Arc<ConfigOptions>, } struct LambdaExpr { // ["v"] params: Vec<String>, // v -> v != 2 body: Arc<dyn PhysicalExpr>, } struct LambdaVariable { // Field::new("v", DataType::Int32, false) field: FieldRef, // 0, the first and only parameter, "v" index: usize, } ``` Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of `HigherOrderUDF` and the `array_transform` implementation of `HigherOrderUDF` relevant methods, collapsed due to their size The added `HigherOrderUDF` trait is almost a clone of `ScalarUDFImpl`, with the exception of: 1. `return_field_from_args` and `invoke_with_args`, where now `args.args` is a list of enums with two variants: `Value` or `Lambda` instead of a list of values 2. the addition of `lambda_parameters`, which return a `Field` for each parameter supported for every lambda argument based on the `Field` of the non lambda arguments 3. the removal of `return_field` and the deprecated ones `is_nullable` and `display_name`. 4. Not yet includes analogues to the methods preimage, placement, evaluate_bounds, propagate_constraints, output_ordering and preserves_lex_ordering <details><summary>HigherOrderUDF</summary> ```rust trait HigherOrderUDF { /// Return the field of all the parameters supported by all the supported lambdas of this function /// based on the field of the value arguments. If a lambda support multiple parameters, or if multiple /// lambdas are supported and some are optional, all should be returned, /// regardless of whether they are used on a particular invocation /// /// Tip: If you have a [`HigherOrderFunction`] invocation, you can call the helper /// [`HigherOrderFunction::lambda_parameters`] instead of this method directly /// /// [`HigherOrderFunction`]: crate::expr::HigherOrderFunction /// [`HigherOrderFunction::lambda_parameters`]: crate::expr::HigherOrderFunction::lambda_parameters /// /// Example for array_transform: /// /// `array_transform([2.0, 8.0], v -> v > 4.0)` /// /// ```ignore /// let lambda_parameters = array_transform.lambda_parameters(&[ /// Arc::new(Field::new("", DataType::new_list(DataType::Float32, false))), // the Field of the literal `[2, 8]` /// ])?; /// /// assert_eq!( /// lambda_parameters, /// vec![ /// // the lambda supported parameters, regardless of how many are actually used /// vec![ /// // the value being transformed /// Field::new("", DataType::Float32, false), /// ] /// ] /// ) /// ``` /// /// The implementation can assume that some other part of the code has coerced /// the actual argument types to match [`Self::signature`]. fn lambda_parameters(&self, value_fields: &[FieldRef]) -> Result<Vec<Vec<Field>>>; fn return_field_from_args(&self, args: LambdaReturnFieldArgs) -> Result<FieldRef>; fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> Result<ColumnarValue>; // ... omitted methods that are similar in ScalarUDFImpl } /// An argument to a lambda function pub enum ValueOrLambda<V, L> { /// A value with associated data Value(V), /// A lambda with associated data Lambda(L), } /// Information about arguments passed to the function /// /// This structure contains metadata about how the function was called /// such as the type of the arguments, any scalar arguments and if the /// arguments can (ever) be null /// /// See [`HigherOrderUDF::return_field_from_args`] for more information pub struct LambdaReturnFieldArgs<'a> { /// The data types of the arguments to the function /// /// If argument `i` to the function is a lambda, it will be the field of the result of the /// lambda if evaluated with the parameters returned from [`HigherOrderUDF::lambda_parameters`] /// /// For example, with `array_transform([1], v -> v == 5)` /// this field will be `[ /// ValueOrLambda::Value(Field::new("", DataType::List(DataType::Int32), false)), /// ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false)) /// ]` pub arg_fields: &'a [ValueOrLambda<FieldRef, FieldRef>], /// Is argument `i` to the function a scalar (constant)? /// /// If the argument `i` is not a scalar, it will be None /// /// For example, if a function is called like `array_transform([1], v -> v == 5)` /// this field will be `[Some(ScalarValue::List(...), None]` pub scalar_arguments: &'a [Option<&'a ScalarValue>], } /// Arguments passed to [`HigherOrderUDF::invoke_with_args`] when invoking a /// lambda function. pub struct HigherOrderFunctionArgs { /// The evaluated arguments and lambdas to the function pub args: Vec<ValueOrLambda<ColumnarValue, LambdaArgument>>, /// Field associated with each arg, if it exists /// For lambdas, it will be the field of the result of /// the lambda if evaluated with the parameters /// returned from [`HigherOrderUDF::lambda_parameters`] pub arg_fields: Vec<ValueOrLambda<FieldRef, FieldRef>>, /// The number of rows in record batch being evaluated pub number_rows: usize, /// The return field of the lambda function returned /// (from `return_field_from_args`) when creating the /// physical expression from the logical expression pub return_field: FieldRef, /// The config options at execution time pub config_options: Arc<ConfigOptions>, } /// A lambda argument to a HigherOrderFunction pub struct LambdaArgument { /// The parameters defined in this lambda /// /// For example, for `array_transform([2], v -> -v)`, /// this will be `vec![Field::new("v", DataType::Int32, true)]` params: Vec<FieldRef>, /// The body of the lambda /// /// For example, for `array_transform([2], v -> -v)`, /// this will be the physical expression of `-v` body: Arc<dyn PhysicalExpr>, } impl LambdaArgument { /// Evaluate this lambda /// `args` should evalute to the value of each parameter /// of the correspondent lambda returned in [HigherOrderUDF::lambda_parameters]. pub fn evaluate( &self, args: &[&dyn Fn() -> Result<ArrayRef>], ) -> Result<ColumnarValue> { let columns = args .iter() .take(self.params.len()) .map(|arg| arg()) .collect::<Result<_>>()?; let schema = Arc::new(Schema::new(self.params.clone())); let batch = RecordBatch::try_new(schema, columns)?; self.body.evaluate(&batch) } } ``` </details> <details><summary>array_transform lambda_parameters implementation</summary> ```rust impl HigherOrderUDF for ArrayTransform { fn lambda_parameters(&self, value_fields: &[FieldRef]) -> Result<Vec<Vec<Field>>> { let list = if value_fields.len() == 1 { &value_fields[0] } else { return plan_err!( "{} function requires 1 value arguments, got {}", self.name(), value_fields.len() ); }; let field = match list.data_type() { DataType::List(field) => field, DataType::LargeList(field) => field, DataType::FixedSizeList(field, _) => field, _ => return plan_err!("expected list, got {list}"), }; // we don't need to check whether the lambda contains more than two parameters, // e.g. array_transform([], (v, i, j) -> v+i+j), as datafusion will do that for us let value = Field::new("", field.data_type().clone(), field.is_nullable()) .with_metadata(field.metadata().clone()); Ok(vec![vec![value]]) } } ``` </details> <details><summary>array_transform return_field_from_args implementation</summary> ```rust fn value_lambda_pair<'a, V: Debug, L: Debug>( name: &str, args: &'a [ValueOrLambda<V, L>], ) -> Result<(&'a V, &'a L)> { let [value, lambda] = take_function_args(name, args)?; let (ValueOrLambda::Value(value), ValueOrLambda::Lambda(lambda)) = (value, lambda) else { return plan_err!( "{name} expects a value followed by a lambda, got {value:?} and {lambda:?}" ); }; Ok((value, lambda)) } impl HigherOrderUDF for ArrayTransform { fn return_field_from_args( &self, args: HigherOrderReturnFieldArgs, ) -> Result<Arc<Field>> { let (list, lambda) = value_lambda_pair(self.name(), args.arg_fields)?; // lambda is the resulting field of executing the lambda body // with the parameters returned in lambda_parameters let field = Arc::new(Field::new( Field::LIST_FIELD_DEFAULT_NAME, lambda.data_type().clone(), lambda.is_nullable(), )); let return_type = match list.data_type() { DataType::List(_) => DataType::List(field), DataType::LargeList(_) => DataType::LargeList(field), DataType::FixedSizeList(_, size) => DataType::FixedSizeList(field, *size), other => plan_err!("expected list, got {other}")?, }; Ok(Arc::new(Field::new("", return_type, list.is_nullable()))) } } ``` </details> <details><summary>array_transform invoke_with_args implementation</summary> ```rust impl HigherOrderUDF for ArrayTransform { fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> Result<ColumnarValue> { let (list, lambda) = value_lambda_pair(self.name(), &args.args)?; let list_array = list.to_array(args.number_rows)?; // Fast path for fully null input array and also the only way to safely work with // a fully null fixed size list array as it can't be handled by remove_list_null_values below if list_array.null_count() == list_array.len() { return Ok(ColumnarValue::Array(new_null_array( args.return_type(), list_array.len(), ))); } // as per list_values docs, if list_array is sliced, list_values will be sliced too, // so before constructing the transformed array below, we must adjust the list offsets with // adjust_offsets_for_slice let list_values = list_values(&list_array)?; // by passing closures, lambda.evaluate can evaluate only those actually needed let values_param = || Ok(Arc::clone(&list_values)); // call the transforming lambda let transformed_values = lambda .evaluate(&[&values_param])? .into_array(list_values.len())?; let field = match args.return_field.data_type() { DataType::List(field) | DataType::LargeList(field) | DataType::FixedSizeList(field, _) => Arc::clone(field), _ => { return exec_err!( "{} expected ScalarFunctionArgs.return_field to be a list, got {}", self.name(), args.return_field ); } }; let transformed_list = match list_array.data_type() { DataType::List(_) => { let list = list_array.as_list(); // since we called list_values above which would return sliced values for // a sliced list, we must adjust the offsets here as otherwise they would be invalid let adjusted_offsets = adjust_offsets_for_slice(list); Arc::new(ListArray::new( field, adjusted_offsets, transformed_values, list.nulls().cloned(), )) as ArrayRef } DataType::LargeList(_) => { let large_list = list_array.as_list(); // since we called list_values above which would return sliced values for // a sliced list, we must adjust the offsets here as otherwise they would be invalid let adjusted_offsets = adjust_offsets_for_slice(large_list); Arc::new(LargeListArray::new( field, adjusted_offsets, transformed_values, large_list.nulls().cloned(), )) } DataType::FixedSizeList(_, value_length) => { Arc::new(FixedSizeListArray::new( field, *value_length, transformed_values, list_array.as_fixed_size_list().nulls().cloned(), )) } other => exec_err!("expected list, got {other}")?, }; Ok(ColumnarValue::Array(transformed_list)) } } ``` </details> <details><summary>How relevant HigherOrderUDF methods would be called and what they would return during planning and evaluation of the example</summary> ```rust // this is called at sql planning let lambda_parameters = lambda_udf.lambda_parameters(&[ Field::new("", DataType::new_list(DataType::Int32, false), false), // the Field of the [2, 3] literal ])?; assert_eq!( lambda_parameters, vec![ // the parameters that *can* be declared on the lambda, and not only // those actually declared: the implementation doesn't need to care // about it vec![ Field::new("", DataType::Int32, false), // the list inner value ]] ); // this is called every time ExprSchemable is called on a HigherOrderFunction let return_field = array_transform.return_field_from_args(&LambdaReturnFieldArgs { arg_fields: &[ ValueOrLambda::Value(Field::new("", DataType::new_list(DataType::Int32, false), false)), ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false)), // the return_field of the expression "v != 2" when "v" is of the type returned in lambda_parameters ], scalar_arguments // irrelevant })?; assert_eq!(return_field, Field::new("", DataType::new_list(DataType::Boolean, false), false)); let value = array_transform.evaluate(&HigherOrderFunctionArgs { args: vec![ ValueOrLambda::Value(List([2, 3])), ValueOrLambda::Lambda(LambdaArgument of `v -> v != 2`), ], arg_fields, // same as above number_rows: 1, return_field, // same as above config_options, // irrelevant })?; assert_eq!(value, BooleanArray::from([false, true])) ``` </details> <br> <br> A pair HigherOrderUDF/HigherOrderUDFImpl like ScalarFunction was not used because those exist only [to maintain backwards compatibility with the older API](https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html#api-note) </details> <br> Why `LambdaVariable` and not `Column`: Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t". Example of code of another traversal that would break: ```rust fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter { let mut used_columns = HashSet::new(); expr.apply(|expr| { if let Some(col) = expr.as_any().downcast_ref::<Column>() { // if this is a lambda column, this function will break used_columns.insert(col.index()); } Ok(TreeNodeRecursion::Continue) }); ... } ``` Furthermore, the implemention of `ExprSchemable` and `PhysicalExpr::return_field` for `Column` expects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters. By including a `FieldRef` on `LambdaVariable` that should be resolved during construction time in the sql planner, `ExprSchemable` and `PhysicalExpr::return_field` simply return it's own Field: <details><summary>LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation </summary> ```rust impl ExprSchemable for Expr { fn to_field( &self, schema: &dyn ExprSchema, ) -> Result<(Option<TableReference>, Arc<Field>)> { let (relation, schema_name) = self.qualified_name(); let field = match self { Expr::LambdaVariable(l) => Ok(Arc::clone(&l.field)), ... }?; Ok(( relation, Arc::new(field.as_ref().clone().with_name(schema_name)), )) } ... } impl PhysicalExpr for LambdaVariable { fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> { Ok(Arc::clone(&self.field)) } ... } ``` </details> <br> <details><summary>Possible alternatives discarded due to complexity, requiring downstream changes and implementation size:</summary> 1. Add a new set of TreeNode methods that provides the set of lambdas parameters names seen during the traversal, so column nodes can be tested if they refer to a regular column or to a lambda parameter. Any downstream user that wants to support lambdas would need use those methods instead of the existing ones. This also would add 1k+ lines to the PR. ```rust impl Expr { pub fn transform_with_lambdas_params< F: FnMut(Self, &HashSet<String>) -> Result<Transformed<Self>>, >( self, mut f: F, ) -> Result<Transformed<Self>> {} } ``` How minimize_join_filter would looks like: ```rust fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter { let mut used_columns = HashSet::new(); expr.apply_with_lambdas_params(|expr, lambdas_params| { if let Some(col) = expr.as_any().downcast_ref::<Column>() { // dont include lambdas parameters if !lambdas_params.contains(col.name()) { used_columns.insert(col.index()); } } Ok(TreeNodeRecursion::Continue) }) ... } ``` 2. Add a flag to the Column node indicating if it refers to a lambda parameter. Still requires checking for it on existing tree traversals that works on Columns (30+) and also downstream. ```rust //logical struct Column { pub relation: Option<TableReference>, pub name: String, pub spans: Spans, pub is_lambda_parameter: bool, } //physical struct Column { name: String, index: usize, is_lambda_parameter: bool, } ``` How minimize_join_filter would look like: ```rust fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter { let mut used_columns = HashSet::new(); expr.apply(|expr| { if let Some(col) = expr.as_any().downcast_ref::<Column>() { // dont include lambdas parameters if !col.is_lambda_parameter { used_columns.insert(col.index()); } } Ok(TreeNodeRecursion::Continue) }) ... } ``` 1. Add a new set of TreeNode methods that provides a schema that includes the lambdas parameters for the scope of the node being visited/transformed: ```rust impl Expr { pub fn transform_with_schema< F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>, >( self, schema: &DFSchema, f: F, ) -> Result<Transformed<Self>> { ... } ... other methods } ``` For any given HigherOrderFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from HigherOrderUDF::lambda_parameters How it would look like: ```rust pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> { let mut has_placeholder = false; // Provide the schema as the first argument. // Transforming closure receive an adjusted_schema as argument self.transform_with_schema(schema, |mut expr, adjusted_schema| { match &mut expr { // Default to assuming the arguments are the same type Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => { // use adjusted_schema and not schema. Those expressions may contain // columns referring to a lambda parameter, which Field would only be // available in adjusted_schema and not in schema rewrite_placeholder(left.as_mut(), right.as_ref(), adjusted_schema)?; rewrite_placeholder(right.as_mut(), left.as_ref(), adjusted_schema)?; } .... ``` 2. Make available trought LogicalPlan and ExecutionPlan nodes a schema that includes all lambdas parameters from all expressions owned by the node, and use this schema for tree traversals. For nodes which won't own any expression, the regular schema can be returned ```rust impl LogicalPlan { fn lambda_extended_schema(&self) -> &DFSchema; } trait ExecutionPlan { fn lambda_extended_schema(&self) -> &DFSchema; } //usage impl LogicalPlan { pub fn replace_params_with_values( self, param_values: &ParamValues, ) -> Result<LogicalPlan> { self.transform_up_with_subqueries(|plan| { // use plan.lambda_extended_schema() containing lambdas parameters // instead of plan.schema() which wont let lambda_extended_schema = Arc::clone(plan.lambda_extended_schema()); let name_preserver = NamePreserver::new(&plan); plan.map_expressions(|e| { // if this expression is child of lambda and contain columns referring it's parameters // the lambda_extended_schema already contain them let (e, has_placeholder) = e.infer_placeholder_types(&lambda_extended_schema)?; .... ``` </details> <br> --------- Co-authored-by: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> Co-authored-by: Lía Adriana <lia.castaneda@datadoghq.com>
This a clean version of apache#18921 to make it easier to review **this is a breaking change due to adding variant to `Expr` enum, new methods on traits `Session`, `FunctionRegistry` and `ContextProvider` and a new arg on `TaskContext::new`** This PR adds support for lambdas and the `array_transform` function used to test the lambda implementation. Example usage: ```sql SELECT array_transform([2, 3], v -> v != 2); [false, true] -- arbitrally nested lambdas are also supported SELECT array_transform([[[2, 3]]], m -> array_transform(m, l -> array_transform(l, v -> v*2))); [[[4, 6]]] ``` Note: column capture has been removed for now and will be added on a follow on PR, see apache#21172 Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code 3 new `Expr` variants are added, `HigherOrderFunction`, owing a new trait `HigherOrderUDF`, which is like a `ScalarFunction`/`ScalarUDFImpl` with support for lambdas, `Lambda`, for the lambda body and it's parameters names, and `LambdaVariable`, which is like `Column` but for lambdas parameters. Their logical representations: ```rust enum Expr { // array_transform([2, 3], v -> v != 2) HigherOrderFunction(HigherOrderFunction), // v -> v != 2 Lambda(Lambda), // v, of the lambda body: v != 2 LambdaVariable(LambdaVariable), ... } // array_transform([2, 3], v -> v != 2) struct HigherOrderFunction { // global instance of array_transform pub func: Arc<dyn HigherOrderUDF>, // [Expr::ScalarValue([2, 3]), Expr::Lambda(v -> v != 2)] pub args: Vec<Expr>, } // v -> v != 2 struct Lambda { // ["v"] pub params: Vec<String>, // v != 2 pub body: Box<Expr>, } // v, of the lambda body: v != 2 struct LambdaVariable { // "v" pub name: String, // Field::new("", DataType::Int32, false) // Note: a follow on PR will make this field optional // to free expr_api from specifying it beforehand, // and add resolve_lambda_variables method to Expr, // similar to Expr::Placeholder, see apache#21172 pub field: FieldRef, pub spans: Spans, } ``` The example would be planned into a tree like this: ``` HigherOrderFunctionExpression name: array_transform children: 1. ListExpression [2,3] 2. LambdaExpression parameters: ["v"] body: BinaryExpression (!=) left: LambdaVariableExpression("v", Field::new("", Int32, false)) right: LiteralExpression("2") ``` The physical counterparts definition: ```rust struct HigherOrderFunctionExpr { // global instance of array_transform fun: Arc<dyn HigherOrderUDF>, // "array_transform" name: String, // [LiteralExpr([2, 3], LambdaExpr("v -> v != 2"))] args: Vec<Arc<dyn PhysicalExpr>>, // [1], the positions at args that contains lambdas lambda_positions: Vec<usize>, // Field::new("", DataType::new_list(DataType::Boolean, false), false) return_field: FieldRef, config_options: Arc<ConfigOptions>, } struct LambdaExpr { // ["v"] params: Vec<String>, // v -> v != 2 body: Arc<dyn PhysicalExpr>, } struct LambdaVariable { // Field::new("v", DataType::Int32, false) field: FieldRef, // 0, the first and only parameter, "v" index: usize, } ``` Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of `HigherOrderUDF` and the `array_transform` implementation of `HigherOrderUDF` relevant methods, collapsed due to their size The added `HigherOrderUDF` trait is almost a clone of `ScalarUDFImpl`, with the exception of: 1. `return_field_from_args` and `invoke_with_args`, where now `args.args` is a list of enums with two variants: `Value` or `Lambda` instead of a list of values 2. the addition of `lambda_parameters`, which return a `Field` for each parameter supported for every lambda argument based on the `Field` of the non lambda arguments 3. the removal of `return_field` and the deprecated ones `is_nullable` and `display_name`. 4. Not yet includes analogues to the methods preimage, placement, evaluate_bounds, propagate_constraints, output_ordering and preserves_lex_ordering <details><summary>HigherOrderUDF</summary> ```rust trait HigherOrderUDF { /// Return the field of all the parameters supported by all the supported lambdas of this function /// based on the field of the value arguments. If a lambda support multiple parameters, or if multiple /// lambdas are supported and some are optional, all should be returned, /// regardless of whether they are used on a particular invocation /// /// Tip: If you have a [`HigherOrderFunction`] invocation, you can call the helper /// [`HigherOrderFunction::lambda_parameters`] instead of this method directly /// /// [`HigherOrderFunction`]: crate::expr::HigherOrderFunction /// [`HigherOrderFunction::lambda_parameters`]: crate::expr::HigherOrderFunction::lambda_parameters /// /// Example for array_transform: /// /// `array_transform([2.0, 8.0], v -> v > 4.0)` /// /// ```ignore /// let lambda_parameters = array_transform.lambda_parameters(&[ /// Arc::new(Field::new("", DataType::new_list(DataType::Float32, false))), // the Field of the literal `[2, 8]` /// ])?; /// /// assert_eq!( /// lambda_parameters, /// vec![ /// // the lambda supported parameters, regardless of how many are actually used /// vec![ /// // the value being transformed /// Field::new("", DataType::Float32, false), /// ] /// ] /// ) /// ``` /// /// The implementation can assume that some other part of the code has coerced /// the actual argument types to match [`Self::signature`]. fn lambda_parameters(&self, value_fields: &[FieldRef]) -> Result<Vec<Vec<Field>>>; fn return_field_from_args(&self, args: LambdaReturnFieldArgs) -> Result<FieldRef>; fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> Result<ColumnarValue>; // ... omitted methods that are similar in ScalarUDFImpl } /// An argument to a lambda function pub enum ValueOrLambda<V, L> { /// A value with associated data Value(V), /// A lambda with associated data Lambda(L), } /// Information about arguments passed to the function /// /// This structure contains metadata about how the function was called /// such as the type of the arguments, any scalar arguments and if the /// arguments can (ever) be null /// /// See [`HigherOrderUDF::return_field_from_args`] for more information pub struct LambdaReturnFieldArgs<'a> { /// The data types of the arguments to the function /// /// If argument `i` to the function is a lambda, it will be the field of the result of the /// lambda if evaluated with the parameters returned from [`HigherOrderUDF::lambda_parameters`] /// /// For example, with `array_transform([1], v -> v == 5)` /// this field will be `[ /// ValueOrLambda::Value(Field::new("", DataType::List(DataType::Int32), false)), /// ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false)) /// ]` pub arg_fields: &'a [ValueOrLambda<FieldRef, FieldRef>], /// Is argument `i` to the function a scalar (constant)? /// /// If the argument `i` is not a scalar, it will be None /// /// For example, if a function is called like `array_transform([1], v -> v == 5)` /// this field will be `[Some(ScalarValue::List(...), None]` pub scalar_arguments: &'a [Option<&'a ScalarValue>], } /// Arguments passed to [`HigherOrderUDF::invoke_with_args`] when invoking a /// lambda function. pub struct HigherOrderFunctionArgs { /// The evaluated arguments and lambdas to the function pub args: Vec<ValueOrLambda<ColumnarValue, LambdaArgument>>, /// Field associated with each arg, if it exists /// For lambdas, it will be the field of the result of /// the lambda if evaluated with the parameters /// returned from [`HigherOrderUDF::lambda_parameters`] pub arg_fields: Vec<ValueOrLambda<FieldRef, FieldRef>>, /// The number of rows in record batch being evaluated pub number_rows: usize, /// The return field of the lambda function returned /// (from `return_field_from_args`) when creating the /// physical expression from the logical expression pub return_field: FieldRef, /// The config options at execution time pub config_options: Arc<ConfigOptions>, } /// A lambda argument to a HigherOrderFunction pub struct LambdaArgument { /// The parameters defined in this lambda /// /// For example, for `array_transform([2], v -> -v)`, /// this will be `vec![Field::new("v", DataType::Int32, true)]` params: Vec<FieldRef>, /// The body of the lambda /// /// For example, for `array_transform([2], v -> -v)`, /// this will be the physical expression of `-v` body: Arc<dyn PhysicalExpr>, } impl LambdaArgument { /// Evaluate this lambda /// `args` should evalute to the value of each parameter /// of the correspondent lambda returned in [HigherOrderUDF::lambda_parameters]. pub fn evaluate( &self, args: &[&dyn Fn() -> Result<ArrayRef>], ) -> Result<ColumnarValue> { let columns = args .iter() .take(self.params.len()) .map(|arg| arg()) .collect::<Result<_>>()?; let schema = Arc::new(Schema::new(self.params.clone())); let batch = RecordBatch::try_new(schema, columns)?; self.body.evaluate(&batch) } } ``` </details> <details><summary>array_transform lambda_parameters implementation</summary> ```rust impl HigherOrderUDF for ArrayTransform { fn lambda_parameters(&self, value_fields: &[FieldRef]) -> Result<Vec<Vec<Field>>> { let list = if value_fields.len() == 1 { &value_fields[0] } else { return plan_err!( "{} function requires 1 value arguments, got {}", self.name(), value_fields.len() ); }; let field = match list.data_type() { DataType::List(field) => field, DataType::LargeList(field) => field, DataType::FixedSizeList(field, _) => field, _ => return plan_err!("expected list, got {list}"), }; // we don't need to check whether the lambda contains more than two parameters, // e.g. array_transform([], (v, i, j) -> v+i+j), as datafusion will do that for us let value = Field::new("", field.data_type().clone(), field.is_nullable()) .with_metadata(field.metadata().clone()); Ok(vec![vec![value]]) } } ``` </details> <details><summary>array_transform return_field_from_args implementation</summary> ```rust fn value_lambda_pair<'a, V: Debug, L: Debug>( name: &str, args: &'a [ValueOrLambda<V, L>], ) -> Result<(&'a V, &'a L)> { let [value, lambda] = take_function_args(name, args)?; let (ValueOrLambda::Value(value), ValueOrLambda::Lambda(lambda)) = (value, lambda) else { return plan_err!( "{name} expects a value followed by a lambda, got {value:?} and {lambda:?}" ); }; Ok((value, lambda)) } impl HigherOrderUDF for ArrayTransform { fn return_field_from_args( &self, args: HigherOrderReturnFieldArgs, ) -> Result<Arc<Field>> { let (list, lambda) = value_lambda_pair(self.name(), args.arg_fields)?; // lambda is the resulting field of executing the lambda body // with the parameters returned in lambda_parameters let field = Arc::new(Field::new( Field::LIST_FIELD_DEFAULT_NAME, lambda.data_type().clone(), lambda.is_nullable(), )); let return_type = match list.data_type() { DataType::List(_) => DataType::List(field), DataType::LargeList(_) => DataType::LargeList(field), DataType::FixedSizeList(_, size) => DataType::FixedSizeList(field, *size), other => plan_err!("expected list, got {other}")?, }; Ok(Arc::new(Field::new("", return_type, list.is_nullable()))) } } ``` </details> <details><summary>array_transform invoke_with_args implementation</summary> ```rust impl HigherOrderUDF for ArrayTransform { fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> Result<ColumnarValue> { let (list, lambda) = value_lambda_pair(self.name(), &args.args)?; let list_array = list.to_array(args.number_rows)?; // Fast path for fully null input array and also the only way to safely work with // a fully null fixed size list array as it can't be handled by remove_list_null_values below if list_array.null_count() == list_array.len() { return Ok(ColumnarValue::Array(new_null_array( args.return_type(), list_array.len(), ))); } // as per list_values docs, if list_array is sliced, list_values will be sliced too, // so before constructing the transformed array below, we must adjust the list offsets with // adjust_offsets_for_slice let list_values = list_values(&list_array)?; // by passing closures, lambda.evaluate can evaluate only those actually needed let values_param = || Ok(Arc::clone(&list_values)); // call the transforming lambda let transformed_values = lambda .evaluate(&[&values_param])? .into_array(list_values.len())?; let field = match args.return_field.data_type() { DataType::List(field) | DataType::LargeList(field) | DataType::FixedSizeList(field, _) => Arc::clone(field), _ => { return exec_err!( "{} expected ScalarFunctionArgs.return_field to be a list, got {}", self.name(), args.return_field ); } }; let transformed_list = match list_array.data_type() { DataType::List(_) => { let list = list_array.as_list(); // since we called list_values above which would return sliced values for // a sliced list, we must adjust the offsets here as otherwise they would be invalid let adjusted_offsets = adjust_offsets_for_slice(list); Arc::new(ListArray::new( field, adjusted_offsets, transformed_values, list.nulls().cloned(), )) as ArrayRef } DataType::LargeList(_) => { let large_list = list_array.as_list(); // since we called list_values above which would return sliced values for // a sliced list, we must adjust the offsets here as otherwise they would be invalid let adjusted_offsets = adjust_offsets_for_slice(large_list); Arc::new(LargeListArray::new( field, adjusted_offsets, transformed_values, large_list.nulls().cloned(), )) } DataType::FixedSizeList(_, value_length) => { Arc::new(FixedSizeListArray::new( field, *value_length, transformed_values, list_array.as_fixed_size_list().nulls().cloned(), )) } other => exec_err!("expected list, got {other}")?, }; Ok(ColumnarValue::Array(transformed_list)) } } ``` </details> <details><summary>How relevant HigherOrderUDF methods would be called and what they would return during planning and evaluation of the example</summary> ```rust // this is called at sql planning let lambda_parameters = lambda_udf.lambda_parameters(&[ Field::new("", DataType::new_list(DataType::Int32, false), false), // the Field of the [2, 3] literal ])?; assert_eq!( lambda_parameters, vec![ // the parameters that *can* be declared on the lambda, and not only // those actually declared: the implementation doesn't need to care // about it vec![ Field::new("", DataType::Int32, false), // the list inner value ]] ); // this is called every time ExprSchemable is called on a HigherOrderFunction let return_field = array_transform.return_field_from_args(&LambdaReturnFieldArgs { arg_fields: &[ ValueOrLambda::Value(Field::new("", DataType::new_list(DataType::Int32, false), false)), ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false)), // the return_field of the expression "v != 2" when "v" is of the type returned in lambda_parameters ], scalar_arguments // irrelevant })?; assert_eq!(return_field, Field::new("", DataType::new_list(DataType::Boolean, false), false)); let value = array_transform.evaluate(&HigherOrderFunctionArgs { args: vec![ ValueOrLambda::Value(List([2, 3])), ValueOrLambda::Lambda(LambdaArgument of `v -> v != 2`), ], arg_fields, // same as above number_rows: 1, return_field, // same as above config_options, // irrelevant })?; assert_eq!(value, BooleanArray::from([false, true])) ``` </details> <br> <br> A pair HigherOrderUDF/HigherOrderUDFImpl like ScalarFunction was not used because those exist only [to maintain backwards compatibility with the older API](https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html#api-note) </details> <br> Why `LambdaVariable` and not `Column`: Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t". Example of code of another traversal that would break: ```rust fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter { let mut used_columns = HashSet::new(); expr.apply(|expr| { if let Some(col) = expr.as_any().downcast_ref::<Column>() { // if this is a lambda column, this function will break used_columns.insert(col.index()); } Ok(TreeNodeRecursion::Continue) }); ... } ``` Furthermore, the implemention of `ExprSchemable` and `PhysicalExpr::return_field` for `Column` expects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters. By including a `FieldRef` on `LambdaVariable` that should be resolved during construction time in the sql planner, `ExprSchemable` and `PhysicalExpr::return_field` simply return it's own Field: <details><summary>LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation </summary> ```rust impl ExprSchemable for Expr { fn to_field( &self, schema: &dyn ExprSchema, ) -> Result<(Option<TableReference>, Arc<Field>)> { let (relation, schema_name) = self.qualified_name(); let field = match self { Expr::LambdaVariable(l) => Ok(Arc::clone(&l.field)), ... }?; Ok(( relation, Arc::new(field.as_ref().clone().with_name(schema_name)), )) } ... } impl PhysicalExpr for LambdaVariable { fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> { Ok(Arc::clone(&self.field)) } ... } ``` </details> <br> <details><summary>Possible alternatives discarded due to complexity, requiring downstream changes and implementation size:</summary> 1. Add a new set of TreeNode methods that provides the set of lambdas parameters names seen during the traversal, so column nodes can be tested if they refer to a regular column or to a lambda parameter. Any downstream user that wants to support lambdas would need use those methods instead of the existing ones. This also would add 1k+ lines to the PR. ```rust impl Expr { pub fn transform_with_lambdas_params< F: FnMut(Self, &HashSet<String>) -> Result<Transformed<Self>>, >( self, mut f: F, ) -> Result<Transformed<Self>> {} } ``` How minimize_join_filter would looks like: ```rust fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter { let mut used_columns = HashSet::new(); expr.apply_with_lambdas_params(|expr, lambdas_params| { if let Some(col) = expr.as_any().downcast_ref::<Column>() { // dont include lambdas parameters if !lambdas_params.contains(col.name()) { used_columns.insert(col.index()); } } Ok(TreeNodeRecursion::Continue) }) ... } ``` 2. Add a flag to the Column node indicating if it refers to a lambda parameter. Still requires checking for it on existing tree traversals that works on Columns (30+) and also downstream. ```rust //logical struct Column { pub relation: Option<TableReference>, pub name: String, pub spans: Spans, pub is_lambda_parameter: bool, } //physical struct Column { name: String, index: usize, is_lambda_parameter: bool, } ``` How minimize_join_filter would look like: ```rust fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter { let mut used_columns = HashSet::new(); expr.apply(|expr| { if let Some(col) = expr.as_any().downcast_ref::<Column>() { // dont include lambdas parameters if !col.is_lambda_parameter { used_columns.insert(col.index()); } } Ok(TreeNodeRecursion::Continue) }) ... } ``` 1. Add a new set of TreeNode methods that provides a schema that includes the lambdas parameters for the scope of the node being visited/transformed: ```rust impl Expr { pub fn transform_with_schema< F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>, >( self, schema: &DFSchema, f: F, ) -> Result<Transformed<Self>> { ... } ... other methods } ``` For any given HigherOrderFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from HigherOrderUDF::lambda_parameters How it would look like: ```rust pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> { let mut has_placeholder = false; // Provide the schema as the first argument. // Transforming closure receive an adjusted_schema as argument self.transform_with_schema(schema, |mut expr, adjusted_schema| { match &mut expr { // Default to assuming the arguments are the same type Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => { // use adjusted_schema and not schema. Those expressions may contain // columns referring to a lambda parameter, which Field would only be // available in adjusted_schema and not in schema rewrite_placeholder(left.as_mut(), right.as_ref(), adjusted_schema)?; rewrite_placeholder(right.as_mut(), left.as_ref(), adjusted_schema)?; } .... ``` 2. Make available trought LogicalPlan and ExecutionPlan nodes a schema that includes all lambdas parameters from all expressions owned by the node, and use this schema for tree traversals. For nodes which won't own any expression, the regular schema can be returned ```rust impl LogicalPlan { fn lambda_extended_schema(&self) -> &DFSchema; } trait ExecutionPlan { fn lambda_extended_schema(&self) -> &DFSchema; } //usage impl LogicalPlan { pub fn replace_params_with_values( self, param_values: &ParamValues, ) -> Result<LogicalPlan> { self.transform_up_with_subqueries(|plan| { // use plan.lambda_extended_schema() containing lambdas parameters // instead of plan.schema() which wont let lambda_extended_schema = Arc::clone(plan.lambda_extended_schema()); let name_preserver = NamePreserver::new(&plan); plan.map_expressions(|e| { // if this expression is child of lambda and contain columns referring it's parameters // the lambda_extended_schema already contain them let (e, has_placeholder) = e.infer_placeholder_types(&lambda_extended_schema)?; .... ``` </details> <br> --------- Co-authored-by: gstvg <28798827+gstvg@users.noreply.github.com> Co-authored-by: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
## Which issue does this PR close? No issue — this is a follow-up to apache#21679. ## Rationale for this change In `ScalarUDF`, arity is enforced by the framework via `TypeSignature`. In `HigherOrderUDF`, functions with a fixed number of value and lambda arguments had to use `UserDefined` and manually validate arity inside `coerce_value_types`, which is boilerplate that every implementor has to repeat. ## What changes are included in this PR? Adds `HigherOrderTypeSignature::Exact { values: usize, lambdas: usize } `variant that enforces a fixed count of value and lambda arguments, calling coerce_value_types only for type coercion as well ## Are these changes tested? Yes I added some planning tests for exact siganture in `datafusion/expr/src/type_coercion/functions.rs` ## Are there any user-facing changes? Yes a new signature for HigherOrderSignature was added. (cherry picked from commit fb26fd9)
## Which issue does this PR close? No issue — this is a follow-up to apache#21679. ## Rationale for this change In `ScalarUDF`, arity is enforced by the framework via `TypeSignature`. In `HigherOrderUDF`, functions with a fixed number of value and lambda arguments had to use `UserDefined` and manually validate arity inside `coerce_value_types`, which is boilerplate that every implementor has to repeat. ## What changes are included in this PR? Adds `HigherOrderTypeSignature::Exact { values: usize, lambdas: usize } `variant that enforces a fixed count of value and lambda arguments, calling coerce_value_types only for type coercion as well ## Are these changes tested? Yes I added some planning tests for exact siganture in `datafusion/expr/src/type_coercion/functions.rs` ## Are there any user-facing changes? Yes a new signature for HigherOrderSignature was added. (cherry picked from commit fb26fd9)
…e#21895) Partially addresses apache#14509 — implements `array_filter` / `list_filter`. `array_transform` (apache#21679) added the first `HigherOrderUDF`. `array_filter` is the natural companion: filter array elements with a boolean lambda, matching Spark `filter` / DuckDB `list_filter` semantics. - New `HigherOrderUDF` `ArrayFilter` (`array_filter` / `list_filter` alias) - Boolean lambda per element; `true` keeps, `false`/null drops (matches Spark semantics) - Handles `List`, `LargeList`, sliced arrays, null sublists - Scalar predicate short-circuit (`x -> true` / `x -> false`) - No-copy fast path when nothing is filtered (skips `arrow::compute::filter`) - Shared HOF helpers extracted from `array_transform` into a common module (`value_lambda_pair`, `coerce_single_list_arg`, `single_list_lambda_parameters`, `extract_list_values`) - Shared unit test helpers for higher-order function tests - Unit tests: basic filter, multiple sublists, sliced arrays, null sublists, all-filtered-out, nothing-filtered (fast path), scalar true/false predicates - SQL logic tests in `array_filter.slt`: filter variants, `array_filter` + `array_transform` combinations, error cases Yes — `array_filter(array, lambda)` and alias `list_filter(array, lambda)` are now available as SQL functions. (cherry picked from commit d91dcb7)
* feat(functions-nested): add array_filter higher-order function (apache#21895) Partially addresses apache#14509 — implements `array_filter` / `list_filter`. `array_transform` (apache#21679) added the first `HigherOrderUDF`. `array_filter` is the natural companion: filter array elements with a boolean lambda, matching Spark `filter` / DuckDB `list_filter` semantics. - New `HigherOrderUDF` `ArrayFilter` (`array_filter` / `list_filter` alias) - Boolean lambda per element; `true` keeps, `false`/null drops (matches Spark semantics) - Handles `List`, `LargeList`, sliced arrays, null sublists - Scalar predicate short-circuit (`x -> true` / `x -> false`) - No-copy fast path when nothing is filtered (skips `arrow::compute::filter`) - Shared HOF helpers extracted from `array_transform` into a common module (`value_lambda_pair`, `coerce_single_list_arg`, `single_list_lambda_parameters`, `extract_list_values`) - Shared unit test helpers for higher-order function tests - Unit tests: basic filter, multiple sublists, sliced arrays, null sublists, all-filtered-out, nothing-filtered (fast path), scalar true/false predicates - SQL logic tests in `array_filter.slt`: filter variants, `array_filter` + `array_transform` combinations, error cases Yes — `array_filter(array, lambda)` and alias `list_filter(array, lambda)` are now available as SQL functions. (cherry picked from commit d91dcb7) * trigger CI --------- Co-authored-by: Lavkesh Lahngir <lavkesh.lahngir@datadoghq.com>
This a clean version of #18921 to make it easier to review
this is a breaking change due to adding variant to
Exprenum, new methods on traitsSession,FunctionRegistryandContextProviderand a new arg onTaskContext::newThis PR adds support for lambdas and the
array_transformfunction used to test the lambda implementation.Example usage:
Note: column capture has been removed for now and will be added on a follow on PR, see #21172
Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code
3 new
Exprvariants are added,HigherOrderFunction, owing a new traitHigherOrderUDF, which is like aScalarFunction/ScalarUDFImplwith support for lambdas,Lambda, for the lambda body and it's parameters names, andLambdaVariable, which is likeColumnbut for lambdas parameters.Their logical representations:
The example would be planned into a tree like this:
The physical counterparts definition:
Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of
HigherOrderUDFand thearray_transformimplementation ofHigherOrderUDFrelevant methods, collapsed due to their sizeThe added
HigherOrderUDFtrait is almost a clone ofScalarUDFImpl, with the exception of:return_field_from_argsandinvoke_with_args, where nowargs.argsis a list of enums with two variants:ValueorLambdainstead of a list of valueslambda_parameters, which return aFieldfor each parameter supported for every lambda argument based on theFieldof the non lambda argumentsreturn_fieldand the deprecated onesis_nullableanddisplay_name.HigherOrderUDF
array_transform lambda_parameters implementation
array_transform return_field_from_args implementation
array_transform invoke_with_args implementation
How relevant HigherOrderUDF methods would be called and what they would return during planning and evaluation of the example
A pair HigherOrderUDF/HigherOrderUDFImpl like ScalarFunction was not used because those exist only to maintain backwards compatibility with the older API #8045
Why
LambdaVariableand notColumn:Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t".
Example of code of another traversal that would break:
Furthermore, the implemention of
ExprSchemableandPhysicalExpr::return_fieldforColumnexpects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters.By including a
FieldRefonLambdaVariablethat should be resolved during construction time in the sql planner,ExprSchemableandPhysicalExpr::return_fieldsimply return it's own Field:LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation
Possible alternatives discarded due to complexity, requiring downstream changes and implementation size:
How minimize_join_filter would looks like:
How minimize_join_filter would look like:
For any given HigherOrderFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from HigherOrderUDF::lambda_parameters
How it would look like: